﻿/**
* CRL
*/
using CRL.Core;
using CRL.Core.Extension;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace CRL.RabbitMQ
{
    //https://www.cnblogs.com/sheng-jie/p/7192690.html
    //https://www.cnblogs.com/julyluo/p/6265775.html
    public abstract class AbsRabbitMQ : IDisposable
    {
        protected IConnection connection;
        protected List<IModel> consumerChannels = new List<IModel>();
        protected string __exchangeName = "";
        protected abstract string MqExchangeType { get; }
        ///// <summary>
        ///// 自定义发送属性
        ///// default Persistent true
        ///// </summary>
        //protected virtual Action<IBasicProperties> BasicPropertiesFunc => (b) =>
        //{
        //    b.Persistent = true;
        //};
        /// <summary>
        /// 自定义队列参数 channel, queueName
        /// </summary>
        protected void QueueDeclare(IModel channel, string queueName, ConsumeOption option)
        {
            option = option ?? new ConsumeOption();
            channel.QueueDeclare(queueName, option.QueueDurable, option.QueueExclusive, option.AutoDelete, option.QueueDeclareArgs);
            //channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>() { { "x-max-priority", 10 } });
        }

        /// <summary>
        /// 声明交换机 channel, exchangeName, exchangeType
        /// </summary>
        protected void ExchangeDeclare(IModel channel, ConsumeOption option)
        {
            option = option ?? new ConsumeOption();
            channel.ExchangeDeclare(__exchangeName, MqExchangeType, option.ExchangeDurable, option.AutoDelete, option.ExchangeDeclareArgs);
        }
        protected void QueueBind(IModel channel, string queueName, string routingKey, ConsumeOption option, IDictionary<string, object> args = null)
        {
            option = option ?? new ConsumeOption();
            option.RoutingKey = routingKey;
            channel.QueueBind(queueName, __exchangeName, routingKey, args);
        }
        public bool IsOpen
        {
            get
            {
                return connection != null && connection.IsOpen;
            }
        }
        ConnectionFactory factory;
        SimplePool<IModel> channelPool;
        ConnectionConfig _config;
        public AbsRabbitMQ(ConnectionConfig config)
        {
            _config = config;
            var host = config.HostName;
            var port = config.Port;
            var arry = host.Split(':');
            if (arry.Length > 1 && port == 0)
            {
                port = Convert.ToInt32(arry[1]);
                host = arry[0];
            }
            if (port == 0)
            {
                port = 5672;
            }

            factory = new ConnectionFactory
            {
                UserName = config.UserName,//用户名
                Password = config.Password,//密码
                HostName = host,//rabbitmq ip
                AutomaticRecoveryEnabled = true,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
                DispatchConsumersAsync = config.ConsumersAsync,
                Port = port,
                VirtualHost = config.VirtualHost,
            };
            CreateConnect();
            channelPool = new SimplePool<IModel>(() =>
             {
                 if (disposeFromInner)
                 {
                     throw new Exception("对象已释放");
                 }
                 if (!connection.IsOpen)
                 {
                     TryConnect();
                 }
                 return connection.CreateModel();
             });
            Log($"登录为 {config.HostName}:{config.Port}@{config.UserName}");
        }
        object sync_root = new object();
        public void TryConnect()
        {
            //手动释放不处理
            if (disposeFromInner)
            {
                return;
            }
            lock (sync_root)
            {
                int i = 1;
                while (!IsOpen)
                {
                    try
                    {
                        CreateConnect();
                    }
                    catch (Exception ero)
                    {
                        Log("Connection eror " + ero.Message);
                    }
                    i++;
                    System.Threading.Thread.Sleep(1000 * i);
                    if (i > 10)
                    {
                        i = 1;
                    }
                }
            }
        }
        void CreateConnect()
        {
            //创建连接
            connection = factory.CreateConnection();

            connection.ConnectionShutdown += (s, e) =>
            {
                Log("RabbitMQ ConnectionShutdown");
                TryConnect();
            };
            connection.CallbackException += (s, e) =>
            {
                Log("RabbitMQ CallbackException");
                TryConnect();
            };
            connection.ConnectionBlocked += (s, e) =>
            {
                Log("RabbitMQ ConnectionBlocked");
                TryConnect();
            };
            //创建通道
            //channel = connection.CreateModel();
            Log($"{factory.HostName} 连接成功");
        }
        protected void Log(string msg)
        {
            Console.WriteLine($"RabbitMQ: {msg}");
            EventLog.Log(msg, "RabbitMQ");
        }

        protected void BasePublish<T>(string routingKey, params T[] msgs)
        {
            BasePublish<T>(routingKey, b => b.Persistent = true, msgs);
        }

        protected void BasePublish<T>(string routingKey, Action<IBasicProperties> basicPropertiesFunc, params T[] msgs)
        {
            var channel = channelPool.Rent();
            var __basicProperties = channel.CreateBasicProperties();
            basicPropertiesFunc?.Invoke(__basicProperties);
  
            try
            {
                if (msgs.GetType() == typeof(byte[]))
                {
                    var sendBytes = msgs as byte[];
                    channel.BasicPublish(__exchangeName, routingKey, __basicProperties, sendBytes);
                    return;
                }
                foreach (var msg in msgs)
                {
                    var sendBytes = Encoding.UTF8.GetBytes(msg.ToJson());
                    channel.BasicPublish(__exchangeName, routingKey, __basicProperties, sendBytes);
                }
            }
            catch (Exception ero)
            {
                throw ero;
            }
            finally
            {
                channelPool.Return(channel);
            }
        }


        protected void BaseBeginConsumer<T>(IModel channel, string queueName, Action<T, string> onReceive, ConsumeOption option = null)
        {
            BaseBeginConsumerString(channel, queueName, (msg, key) =>
              {
                  T obj;
                  //自动区分是byte[]还是对象
                  if (typeof(T) == typeof(byte[]))
                  {
                      obj = (T)(msg as object);
                  }
                  else
                  {
                      var data = Encoding.UTF8.GetString(msg);
                      obj = data.ToObject<T>();
                  }
                  onReceive(obj, key);
              }, option);
        }
        static ConcurrentDictionary<string, int> NackCache = new ConcurrentDictionary<string, int>();
        static string EncryptMD5(byte[] toByte)
        {
            string result;
            MD5CryptoServiceProvider md5 = new MD5CryptoServiceProvider();
            toByte = md5.ComputeHash(toByte);
            result = BitConverter.ToString(toByte).Replace("-", "");

            return result;
        }
        static Dictionary<string, ConsumeOption> consumeOptionCache = new Dictionary<string, ConsumeOption>();
        void checkNack(IModel channel, Tuple<ulong, byte[]> tuple, int ConsumerRetryTimes, bool basicNack = true)
        {
            //消费失败后重试
            //var msgKey = EncryptMD5(tuple.Item2);
            var deliveryTag = tuple.Item1;
            var msgKey = deliveryTag.ToString();
            var a = NackCache.TryGetValue(msgKey, out var n);
            if (!a)
            {
                n = 1;
                NackCache.TryAdd(msgKey, n);
            }
            else
            {
                n += 1;
                NackCache[msgKey] = n;
            }
            if (n <= ConsumerRetryTimes)
            {
                if (basicNack)
                {
                    //尝试次数内发送nack
                    channel.BasicNack(deliveryTag, false, true);
                    Thread.Sleep(100);
                }
            }
            else
            {
                //到达次数，移除，标记为ack
                NackCache.TryRemove(msgKey, out var n2);
                channel.BasicAck(deliveryTag, false);
                Log($"消费{n2}次后失败,消息标记为ack {deliveryTag}");
            }
        }
        protected void BaseBeginConsumerString(IModel channel, string queueName, Action<byte[], string> onReceive, ConsumeOption option = null)
        {
            if (_config.ConsumersAsync)
            {
                throw new Exception("ConsumersAsync必须为false");
            }
            option = option ?? new ConsumeOption { IsAsync = false };
            option?.ConsumerChannelFunc?.Invoke(channel);
            if (!string.IsNullOrEmpty(option.RoutingKey))
            {
                consumeOptionCache[option.RoutingKey] = option;
            }
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                //var message = Encoding.UTF8.GetString(ea.Body);
                consumeOptionCache.TryGetValue(ea.RoutingKey, out var _option);
                if (_option?.IsAsync == true)
                {
                    channel.BasicNack(ea.DeliveryTag, false, true);
                    return;
                }
                //if (System.Diagnostics.Debugger.IsAttached)
                //{
                //    Console.WriteLine($"from synchronous {message}");
                //}
                try
                {
                    onReceive(ea.Body, ea.RoutingKey);
                    //确认该消息已被消费
                    channel.BasicAck(ea.DeliveryTag, false);
                }
                catch (Exception ero)
                {
                    #region nack处理
                    var ConsumerRetryTimes = _option?.ConsumerRetryTimes;
                    if (ConsumerRetryTimes > 0)
                    {
                        //消费失败后重试
                        checkNack(channel, new Tuple<ulong, byte[]>(ea.DeliveryTag, ea.Body), ConsumerRetryTimes.Value);
                    }
                    else
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                    #endregion
                    throw ero;//总是抛出异常
                }
            };
            //7. 启动消费者
            if (option.LazyConsume)
            {
                CacheConsume(channel, queueName, consumer);
            }
            else
            {
                channel.BasicConsume(queueName, false, consumer);
            }
        }
        protected void BaseBeginConsumerAsync(IModel channel, string queueName, Func<byte[], string, Task> onReceive, ConsumeOption option = null)
        {
            if (!_config.ConsumersAsync)
            {
                throw new Exception("ConsumersAsync必须为true");
            }
            option = option ?? new ConsumeOption { IsAsync = true };
            option.IsAsync = true;
            option?.ConsumerChannelFunc?.Invoke(channel);
            if (!string.IsNullOrEmpty(option.RoutingKey))
            {
                consumeOptionCache[option.RoutingKey] = option;
            }
            var consumer = new AsyncEventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += async (model, ea) =>
            {
                //var message = Encoding.UTF8.GetString(ea.Body);
                consumeOptionCache.TryGetValue(ea.RoutingKey, out var _option);
                if (_option?.IsAsync != true)
                {
                    channel.BasicNack(ea.DeliveryTag, false, true);
                    return;
                }
                try
                {
                    await onReceive.Invoke(ea.Body, ea.RoutingKey);
                    //确认该消息已被消费
                    channel.BasicAck(ea.DeliveryTag, false);
                }
                catch (Exception ero)
                {
                    #region nack处理
                    var ConsumerRetryTimes = _option?.ConsumerRetryTimes;
                    if (ConsumerRetryTimes > 0)
                    {
                        //消费失败后重试
                        checkNack(channel, new Tuple<ulong, byte[]>(ea.DeliveryTag, ea.Body), ConsumerRetryTimes.Value);
                    }
                    else
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                    #endregion
                    throw ero;//总是抛出异常
                }
            };
            //7. 启动消费者
            if (option.LazyConsume)
            {
                CacheConsume(channel, queueName, consumer);
            }
            else
            {
                channel.BasicConsume(queueName, false, consumer);
            }
        }
        protected IModel CreateConsumerChannel()
        {
            if (!connection.IsOpen)
            {
                TryConnect();
            }
            var channel = connection.CreateModel();
            //func(channel);
            channel.CallbackException += (sender, ea) =>
            {
                Log("CallbackException " + ea.Exception);

                //consumerChannel.Dispose();
                //consumerChannel = CreateConsumerChannel(func);
            };
            consumerChannels.Add(channel);
            return channel;
        }
        bool disposeFromInner;
        public void Dispose()
        {
            disposeFromInner = true;
            foreach (var c in consumerChannels)
            {
                c?.Dispose();
            }
            channelPool.Dispose();
            connection?.Dispose();
            //batchThread?.Stop();
        }
        public long CleanQueue(string queue)
        {
            var channel = channelPool.Rent();
            var count = channel.QueueDelete(queue);
            channelPool.Return(channel);
            return count;
        }
        public long GetQueueLength(string queue)
        {
            var channel = channelPool.Rent();
            var count = channel.MessageCount(queue);
            channelPool.Return(channel);
            return count;
        }
        #region 手动延迟订阅
        List<Tuple<IModel, string, IBasicConsumer>> consumes = new List<Tuple<IModel, string, IBasicConsumer>>();
        internal void CacheConsume(IModel channel, string queueName, IBasicConsumer consume)
        {
            consumes.Add(new Tuple<IModel, string, IBasicConsumer>(channel, queueName, consume));
        }
        public void ConfirmConsume()
        {
            foreach (var tu in consumes)
            {
                var channel = tu.Item1;
                var queueName = tu.Item2;
                var consumer = tu.Item3;
                channel.BasicConsume(queueName, false, consumer);
            }
        }
        #endregion
    }
}
